# SPDX-FileCopyrightText: 2022-present deepset GmbH <info@deepset.ai>
#
# SPDX-License-Identifier: Apache-2.0

import json
import os
from datetime import datetime
from typing import Any, Optional, Union

from openai import AsyncOpenAI, AsyncStream, OpenAI, Stream
from openai.lib._pydantic import to_strict_json_schema
from openai.types.responses import ParsedResponse, Response, ResponseOutputRefusal, ResponseStreamEvent
from pydantic import BaseModel

from haystack import component, default_from_dict, default_to_dict, logging
from haystack.components.generators.utils import _serialize_object
from haystack.dataclasses import (
    AsyncStreamingCallbackT,
    ChatMessage,
    ComponentInfo,
    ImageContent,
    ReasoningContent,
    StreamingCallbackT,
    StreamingChunk,
    SyncStreamingCallbackT,
    TextContent,
    ToolCall,
    ToolCallDelta,
    select_streaming_callback,
)
from haystack.tools import (
    ToolsType,
    _check_duplicate_tool_names,
    deserialize_tools_or_toolset_inplace,
    flatten_tools_or_toolsets,
    serialize_tools_or_toolset,
    warm_up_tools,
)
from haystack.utils import Secret, deserialize_callable, deserialize_secrets_inplace, serialize_callable
from haystack.utils.http_client import init_http_client

logger = logging.getLogger(__name__)


@component
class OpenAIResponsesChatGenerator:
    """
    Completes chats using OpenAI's Responses API.

    It works with the gpt-4 and o-series models and supports streaming responses
    from OpenAI API. It uses [ChatMessage](https://docs.haystack.deepset.ai/docs/chatmessage)
    format in input and output.

    You can customize how the text is generated by passing parameters to the
    OpenAI API. Use the `**generation_kwargs` argument when you initialize
    the component or when you run it. Any parameter that works with
    `openai.Responses.create` will work here too.

    For details on OpenAI API parameters, see
    [OpenAI documentation](https://platform.openai.com/docs/api-reference/responses).

    ### Usage example

    ```python
    from haystack.components.generators.chat import OpenAIResponsesChatGenerator
    from haystack.dataclasses import ChatMessage

    messages = [ChatMessage.from_user("What's Natural Language Processing?")]

    client = OpenAIResponsesChatGenerator(generation_kwargs={"reasoning": {"effort": "low", "summary": "auto"}})
    response = client.run(messages)
    print(response)
    ```
    """

    def __init__(
        self,
        *,
        api_key: Secret = Secret.from_env_var("OPENAI_API_KEY"),
        model: str = "gpt-5-mini",
        streaming_callback: Optional[StreamingCallbackT] = None,
        api_base_url: Optional[str] = None,
        organization: Optional[str] = None,
        generation_kwargs: Optional[dict[str, Any]] = None,
        timeout: Optional[float] = None,
        max_retries: Optional[int] = None,
        tools: Optional[Union[ToolsType, list[dict]]] = None,
        tools_strict: bool = False,
        http_client_kwargs: Optional[dict[str, Any]] = None,
    ):
        """
        Creates an instance of OpenAIResponsesChatGenerator. Uses OpenAI's gpt-5-mini by default.

        Before initializing the component, you can set the 'OPENAI_TIMEOUT' and 'OPENAI_MAX_RETRIES'
        environment variables to override the `timeout` and `max_retries` parameters respectively
        in the OpenAI client.

        :param api_key: The OpenAI API key.
            You can set it with an environment variable `OPENAI_API_KEY`, or pass with this parameter
            during initialization.
        :param model: The name of the model to use.
        :param streaming_callback: A callback function that is called when a new token is received from the stream.
            The callback function accepts [StreamingChunk](https://docs.haystack.deepset.ai/docs/data-classes#streamingchunk)
            as an argument.
        :param api_base_url: An optional base URL.
        :param organization: Your organization ID, defaults to `None`. See
        [production best practices](https://platform.openai.com/docs/guides/production-best-practices/setting-up-your-organization).
        :param generation_kwargs: Other parameters to use for the model. These parameters are sent
           directly to the OpenAI endpoint.
           See OpenAI [documentation](https://platform.openai.com/docs/api-reference/responses) for
            more details.
            Some of the supported parameters:
            - `temperature`: What sampling temperature to use. Higher values like 0.8 will make the output more random,
                while lower values like 0.2 will make it more focused and deterministic.
            - `top_p`: An alternative to sampling with temperature, called nucleus sampling, where the model
                considers the results of the tokens with top_p probability mass. For example, 0.1 means only the tokens
                comprising the top 10% probability mass are considered.
            - `previous_response_id`: The ID of the previous response.
                Use this to create multi-turn conversations.
            - `text_format`: A Pydantic model that enforces the structure of the model's response.
                If provided, the output will always be validated against this
                format (unless the model returns a tool call).
                For details, see the [OpenAI Structured Outputs documentation](https://platform.openai.com/docs/guides/structured-outputs).
            - `text`: A JSON schema that enforces the structure of the model's response.
                If provided, the output will always be validated against this
                format (unless the model returns a tool call).
                Notes:
                - Both JSON Schema and Pydantic models are supported for latest models starting from GPT-4o.
                - If both are provided, `text_format` takes precedence and json schema passed to `text` is ignored.
                - Currently, this component doesn't support streaming for structured outputs.
                - Older models only support basic version of structured outputs through `{"type": "json_object"}`.
                    For detailed information on JSON mode, see the [OpenAI Structured Outputs documentation](https://platform.openai.com/docs/guides/structured-outputs#json-mode).
            - `reasoning`: A dictionary of parameters for reasoning. For example:
                - `summary`: The summary of the reasoning.
                - `effort`: The level of effort to put into the reasoning. Can be `low`, `medium` or `high`.
                - `generate_summary`: Whether to generate a summary of the reasoning.
                Note: OpenAI does not return the reasoning tokens, but we can view summary if its enabled.
                For details, see the [OpenAI Reasoning documentation](https://platform.openai.com/docs/guides/reasoning).
        :param timeout:
            Timeout for OpenAI client calls. If not set, it defaults to either the
            `OPENAI_TIMEOUT` environment variable, or 30 seconds.
        :param max_retries:
            Maximum number of retries to contact OpenAI after an internal error.
            If not set, it defaults to either the `OPENAI_MAX_RETRIES` environment variable, or set to 5.
        :param tools:
            The tools that the model can use to prepare calls. This parameter can accept either a
            mixed list of Haystack `Tool` objects and Haystack `Toolset`. Or you can pass a dictionary of
            OpenAI/MCP tool definitions.
            Note: You cannot pass OpenAI/MCP tools and Haystack tools together.
            For details on tool support, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/responses/create#responses-create-tools).
        :param tools_strict:
            Whether to enable strict schema adherence for tool calls. If set to `False`, the model may not exactly
            follow the schema provided in the `parameters` field of the tool definition. In Response API, tool calls
            are strict by default.
        :param http_client_kwargs:
            A dictionary of keyword arguments to configure a custom `httpx.Client`or `httpx.AsyncClient`.
            For more information, see the [HTTPX documentation](https://www.python-httpx.org/api/#client).

        """
        self.api_key = api_key
        self.model = model
        self.generation_kwargs = generation_kwargs or {}
        self.streaming_callback = streaming_callback
        self.api_base_url = api_base_url
        self.organization = organization
        self.timeout = timeout
        self.max_retries = max_retries
        self.tools = tools  # Store tools as-is, whether it's a list or a Toolset
        self.tools_strict = tools_strict
        self.http_client_kwargs = http_client_kwargs

        if timeout is None:
            timeout = float(os.environ.get("OPENAI_TIMEOUT", "30.0"))
        if max_retries is None:
            max_retries = int(os.environ.get("OPENAI_MAX_RETRIES", "5"))

        resolved_api_key = api_key.resolve_value() if isinstance(api_key, Secret) else api_key
        client_kwargs: dict[str, Any] = {
            "api_key": resolved_api_key,
            "organization": organization,
            "base_url": api_base_url,
            "timeout": timeout,
            "max_retries": max_retries,
        }

        self.client = OpenAI(http_client=init_http_client(self.http_client_kwargs, async_client=False), **client_kwargs)
        self.async_client = AsyncOpenAI(
            http_client=init_http_client(self.http_client_kwargs, async_client=True), **client_kwargs
        )
        self._is_warmed_up = False

    def warm_up(self):
        """
        Warm up the OpenAI responses chat generator.

        This will warm up the tools registered in the chat generator.
        This method is idempotent and will only warm up the tools once.
        """
        if not self._is_warmed_up:
            is_openai_tool = isinstance(self.tools, list) and isinstance(self.tools[0], dict)
            # We only warm up Haystack tools, not OpenAI/MCP tools
            # The type ignore is needed because mypy cannot infer the type correctly
            if not is_openai_tool:
                warm_up_tools(self.tools)  # type: ignore[arg-type]
            self._is_warmed_up = True

    def _get_telemetry_data(self) -> dict[str, Any]:
        """
        Data that is sent to Posthog for usage analytics.
        """
        return {"model": self.model}

    def to_dict(self) -> dict[str, Any]:
        """
        Serialize this component to a dictionary.

        :returns:
            The serialized component as a dictionary.
        """
        callback_name = serialize_callable(self.streaming_callback) if self.streaming_callback else None
        generation_kwargs = self.generation_kwargs.copy()
        text_format = generation_kwargs.pop("text_format", None)

        # If the response format is a Pydantic model, it's converted to openai's json schema format
        # If it's already a json schema, it's left as is
        if text_format and isinstance(text_format, type) and issubclass(text_format, BaseModel):
            json_schema = {
                "format": {
                    "type": "json_schema",
                    "name": text_format.__name__,
                    "strict": True,
                    "schema": to_strict_json_schema(text_format),
                }
            }
            # json schema needs to be passed to text parameter instead of text_format
            generation_kwargs["text"] = json_schema

        # OpenAI/MCP tools are passed as list of dictionaries
        serialized_tools: Union[dict[str, Any], list[dict[str, Any]], None]
        if self.tools and isinstance(self.tools, list) and isinstance(self.tools[0], dict):
            # mypy can't infer that self.tools is list[dict] here
            serialized_tools = self.tools  # type: ignore[assignment]
        else:
            serialized_tools = serialize_tools_or_toolset(self.tools)  # type: ignore[arg-type]

        return default_to_dict(
            self,
            model=self.model,
            streaming_callback=callback_name,
            api_base_url=self.api_base_url,
            organization=self.organization,
            generation_kwargs=generation_kwargs,
            api_key=self.api_key.to_dict(),
            timeout=self.timeout,
            max_retries=self.max_retries,
            tools=serialized_tools,
            tools_strict=self.tools_strict,
            http_client_kwargs=self.http_client_kwargs,
        )

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "OpenAIResponsesChatGenerator":
        """
        Deserialize this component from a dictionary.

        :param data: The dictionary representation of this component.
        :returns:
            The deserialized component instance.
        """
        deserialize_secrets_inplace(data["init_parameters"], keys=["api_key"])

        # we only deserialize the tools if they are haystack tools
        # because openai tools are not serialized in the same way

        tools = data["init_parameters"].get("tools")
        if tools and (
            isinstance(tools, dict)
            and tools.get("type") == "haystack.tools.toolset.Toolset"
            or isinstance(tools, list)
            and tools[0].get("type") == "haystack.tools.tool.Tool"
        ):
            deserialize_tools_or_toolset_inplace(data["init_parameters"], key="tools")

        init_params = data.get("init_parameters", {})
        serialized_callback_handler = init_params.get("streaming_callback")

        if serialized_callback_handler:
            data["init_parameters"]["streaming_callback"] = deserialize_callable(serialized_callback_handler)
        return default_from_dict(cls, data)

    @component.output_types(replies=list[ChatMessage])
    def run(
        self,
        messages: list[ChatMessage],
        *,
        streaming_callback: Optional[StreamingCallbackT] = None,
        generation_kwargs: Optional[dict[str, Any]] = None,
        tools: Optional[Union[ToolsType, list[dict]]] = None,
        tools_strict: Optional[bool] = None,
    ):
        """
        Invokes response generation based on the provided messages and generation parameters.

        :param messages:
            A list of ChatMessage instances representing the input messages.
        :param streaming_callback:
            A callback function that is called when a new token is received from the stream.
        :param generation_kwargs:
            Additional keyword arguments for text generation. These parameters will
            override the parameters passed during component initialization.
            For details on OpenAI API parameters, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/responses/create).
        :param tools:
            The tools that the model can use to prepare calls. If set, it will override the
            `tools` parameter set during component initialization. This parameter can accept either a
            mixed list of Haystack `Tool` objects and Haystack `Toolset`. Or you can pass a dictionary of
            OpenAI/MCP tool definitions.
            Note: You cannot pass OpenAI/MCP tools and Haystack tools together.
            For details on tool support, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/responses/create#responses-create-tools).
        :param tools_strict:
            Whether to enable strict schema adherence for tool calls. If set to `False`, the model may not exactly
            follow the schema provided in the `parameters` field of the tool definition. In Response API, tool calls
            are strict by default.
            If set, it will override the `tools_strict` parameter set during component initialization.

        :returns:
            A dictionary with the following key:
            - `replies`: A list containing the generated responses as ChatMessage instances.
        """
        if len(messages) == 0:
            return {"replies": []}

        streaming_callback = select_streaming_callback(
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=False
        )
        responses: Union[Stream[ResponseStreamEvent], Response]

        api_args = self._prepare_api_call(
            messages=messages,
            streaming_callback=streaming_callback,
            generation_kwargs=generation_kwargs,
            tools=tools,
            tools_strict=tools_strict,
        )
        openai_endpoint = api_args.pop("openai_endpoint")
        openai_endpoint_method = getattr(self.client.responses, openai_endpoint)
        responses = openai_endpoint_method(**api_args)

        if streaming_callback is not None:
            response_output = self._handle_stream_response(
                responses,  # type: ignore
                streaming_callback,
            )
        else:
            assert isinstance(responses, Response), "Unexpected response type for non-streaming request."
            response_output = [_convert_response_to_chat_message(responses)]
        return {"replies": response_output}

    @component.output_types(replies=list[ChatMessage])
    async def run_async(
        self,
        messages: list[ChatMessage],
        *,
        streaming_callback: Optional[StreamingCallbackT] = None,
        generation_kwargs: Optional[dict[str, Any]] = None,
        tools: Optional[Union[ToolsType, list[dict]]] = None,
        tools_strict: Optional[bool] = None,
    ):
        """
        Asynchronously invokes response generation based on the provided messages and generation parameters.

        This is the asynchronous version of the `run` method. It has the same parameters and return values
        but can be used with `await` in async code.

        :param messages:
            A list of ChatMessage instances representing the input messages.
        :param streaming_callback:
            A callback function that is called when a new token is received from the stream.
            Must be a coroutine.
        :param generation_kwargs:
            Additional keyword arguments for text generation. These parameters will
            override the parameters passed during component initialization.
            For details on OpenAI API parameters, see [OpenAI documentation](https://platform.openai.com/docs/api-reference/responses/create).
        :param tools:
            A list of tools or a Toolset for which the model can prepare calls. If set, it will override the
            `tools` parameter set during component initialization. This parameter can accept either a list of
            mixed list of Haystack `Tool` objects and Haystack `Toolset`. Or you can pass a dictionary of
            OpenAI/MCP tool definitions.
            Note: You cannot pass OpenAI/MCP tools and Haystack tools together.
        :param tools_strict:
            Whether to enable strict schema adherence for tool calls. If set to `True`, the model will follow exactly
            the schema provided in the `parameters` field of the tool definition, but this may increase latency.
            If set, it will override the `tools_strict` parameter set during component initialization.

        :returns:
            A dictionary with the following key:
            - `replies`: A list containing the generated responses as ChatMessage instances.
        """
        # validate and select the streaming callback
        streaming_callback = select_streaming_callback(
            init_callback=self.streaming_callback, runtime_callback=streaming_callback, requires_async=True
        )
        responses: Union[AsyncStream[ResponseStreamEvent], Response]

        if len(messages) == 0:
            return {"replies": []}

        api_args = self._prepare_api_call(
            messages=messages,
            streaming_callback=streaming_callback,
            generation_kwargs=generation_kwargs,
            tools=tools,
            tools_strict=tools_strict,
        )

        openai_endpoint = api_args.pop("openai_endpoint")
        openai_endpoint_method = getattr(self.async_client.responses, openai_endpoint)
        responses = await openai_endpoint_method(**api_args)

        if streaming_callback is not None:
            response_output = await self._handle_async_stream_response(
                responses,  # type: ignore
                streaming_callback,
            )

        else:
            assert isinstance(responses, Response), "Unexpected response type for non-streaming request."
            response_output = [_convert_response_to_chat_message(responses)]
        return {"replies": response_output}

    def _prepare_api_call(  # noqa: PLR0913
        self,
        *,
        messages: list[ChatMessage],
        streaming_callback: Optional[StreamingCallbackT] = None,
        generation_kwargs: Optional[dict[str, Any]] = None,
        tools: Optional[Union[ToolsType, list[dict]]] = None,
        tools_strict: Optional[bool] = None,
    ) -> dict[str, Any]:
        # update generation kwargs by merging with the generation kwargs passed to the run method
        generation_kwargs = {**self.generation_kwargs, **(generation_kwargs or {})}

        # adapt ChatMessage(s) to the format expected by the OpenAI API
        openai_formatted_messages: list[dict[str, Any]] = []
        for message in messages:
            openai_formatted_messages.extend(_convert_chat_message_to_responses_api_format(message))

        tools = tools or self.tools
        tools_strict = tools_strict if tools_strict is not None else self.tools_strict

        openai_tools = {}
        # Build tool definitions
        if tools:
            tool_definitions: list[Any] = []
            if isinstance(tools, list) and isinstance(tools[0], dict):
                # Predefined OpenAI/MCP-style tools
                tool_definitions = tools

            # Convert all tool objects to the correct OpenAI-compatible structure
            else:
                # mypy can't infer that tools is ToolsType here
                flattened_tools = flatten_tools_or_toolsets(tools)  # type: ignore[arg-type]
                _check_duplicate_tool_names(flattened_tools)
                for t in flattened_tools:
                    function_spec = {**t.tool_spec}
                    if not tools_strict:
                        function_spec["strict"] = False
                    function_spec["parameters"]["additionalProperties"] = False
                    tool_definitions.append({"type": "function", **function_spec})

            openai_tools = {"tools": tool_definitions}

        base_args = {"model": self.model, "input": openai_formatted_messages, **openai_tools, **generation_kwargs}

        # if both `text_format` and `text` are provided, `text_format` takes precedence
        # and json schema passed to `text` is ignored
        if generation_kwargs.get("text_format") or generation_kwargs.get("text"):
            return {**base_args, "stream": streaming_callback is not None, "openai_endpoint": "parse"}
        # we pass a key `openai_endpoint` as a hint to the run method to use the create or parse endpoint
        # this key will be removed before the API call is made
        return {**base_args, "stream": streaming_callback is not None, "openai_endpoint": "create"}

    def _handle_stream_response(self, responses: Stream, callback: SyncStreamingCallbackT) -> list[ChatMessage]:
        component_info = ComponentInfo.from_component(self)
        chunks: list[StreamingChunk] = []

        for openai_chunk in responses:  # pylint: disable=not-an-iterable
            chunk_delta = _convert_response_chunk_to_streaming_chunk(
                chunk=openai_chunk, previous_chunks=chunks, component_info=component_info
            )
            chunks.append(chunk_delta)
            callback(chunk_delta)
        chat_message = _convert_streaming_chunks_to_chat_message(chunks=chunks)
        return [chat_message]

    async def _handle_async_stream_response(
        self, responses: AsyncStream, callback: AsyncStreamingCallbackT
    ) -> list[ChatMessage]:
        component_info = ComponentInfo.from_component(self)
        chunks: list[StreamingChunk] = []
        async for openai_chunk in responses:  # pylint: disable=not-an-iterable
            chunk_delta = _convert_response_chunk_to_streaming_chunk(
                chunk=openai_chunk, previous_chunks=chunks, component_info=component_info
            )
            chunks.append(chunk_delta)
            await callback(chunk_delta)
        chat_message = _convert_streaming_chunks_to_chat_message(chunks=chunks)
        return [chat_message]


def _convert_response_to_chat_message(responses: Union[Response, ParsedResponse]) -> ChatMessage:
    """
    Converts the non-streaming response from the OpenAI API to a ChatMessage.

    :param responses: The responses returned by the OpenAI API.
    :returns: The ChatMessage.
    """

    tool_calls = []
    reasoning = None
    logprobs: list[dict] = []
    for output in responses.output:
        if isinstance(output, ResponseOutputRefusal):
            logger.warning("OpenAI returned a refusal output: {output}", output=output)
            continue

        if output.type == "message":
            for content in output.content:
                if hasattr(content, "logprobs") and content.logprobs is not None:
                    logprobs.append(_serialize_object(content.logprobs))

        if output.type == "reasoning":
            # openai doesn't return the reasoning tokens, but we can view summary if its enabled
            # https://platform.openai.com/docs/guides/reasoning#reasoning-summaries
            summaries = output.summary
            extra = output.to_dict()
            # we dont need the summary in the extra
            extra.pop("summary")
            reasoning_text = "\n".join([summary.text for summary in summaries if summaries])
            reasoning = ReasoningContent(reasoning_text=reasoning_text, extra=extra)

        elif output.type == "function_call":
            try:
                arguments = json.loads(output.arguments)
                tool_calls.append(
                    ToolCall(
                        id=output.id, tool_name=output.name, arguments=arguments, extra={"call_id": output.call_id}
                    )
                )
            except json.JSONDecodeError:
                logger.warning(
                    "The LLM provider returned a malformed JSON string for tool call arguments. This tool call "
                    "will be skipped. To always generate a valid JSON, set `tools_strict` to `True`. "
                    "Tool call ID: {_id}, Tool name: {_name}, Arguments: {_arguments}",
                    _id=output.id,
                    _name=output.name,
                    _arguments=output.arguments,
                )
                arguments = {}

    # we save the response as dict because it contains resp_id etc.
    meta = responses.to_dict()

    # remove output from meta because it contains toolcalls, reasoning, text etc.
    meta.pop("output")

    if logprobs:
        meta["logprobs"] = logprobs

    chat_message = ChatMessage.from_assistant(
        text=responses.output_text if responses.output_text else None,
        reasoning=reasoning,
        tool_calls=tool_calls,
        meta=meta,
    )

    return chat_message


def _convert_response_chunk_to_streaming_chunk(  # pylint: disable=too-many-return-statements
    chunk: ResponseStreamEvent, previous_chunks: list[StreamingChunk], component_info: Optional[ComponentInfo] = None
) -> StreamingChunk:
    """
    Converts the streaming response chunk from the OpenAI Responses API to a StreamingChunk.

    :param chunk: The chunk returned by the OpenAI Responses API.
    :param previous_chunks: A list of previously received StreamingChunks.
    :param component_info: An optional `ComponentInfo` object containing information about the component that
        generated the chunk, such as the component name and type.
    :returns:
        A StreamingChunk object representing the content of the chunk from the OpenAI Responses API.
    """
    if chunk.type == "response.output_item.added":
        # Responses API always returns reasoning chunks even if there is no summary
        if chunk.item.type == "reasoning":
            reasoning = ReasoningContent(reasoning_text="", extra=chunk.item.to_dict())
            return StreamingChunk(
                content="",
                component_info=component_info,
                index=chunk.output_index,
                reasoning=reasoning,
                start=True,
                meta={"received_at": datetime.now().isoformat()},
            )

        # the function name is only streamed at the start and end of the function call
        if chunk.item.type == "function_call":
            tool_call = ToolCallDelta(
                index=chunk.output_index, id=chunk.item.id, tool_name=chunk.item.name, extra=chunk.item.to_dict()
            )
            return StreamingChunk(
                content="",
                component_info=component_info,
                index=chunk.output_index,
                tool_calls=[tool_call],
                start=True,
                meta={"received_at": datetime.now().isoformat()},
            )

    elif chunk.type == "response.completed":
        # This means a full response is finished
        # If there are tool_calls present in the final output we mark finish_reason as tool_calls otherwise it's
        # marked as stop
        return StreamingChunk(
            content="",
            component_info=component_info,
            finish_reason="tool_calls" if any(o.type == "function_call" for o in chunk.response.output) else "stop",
            meta={**chunk.to_dict(), "received_at": datetime.now().isoformat()},
        )

    elif chunk.type == "response.output_text.delta":
        # Start is determined by checking if this is the first text delta event of a new output_index
        # 1) Check if all previous chunks have different output_index
        # 2) If any chunks do have the same output_index, check if they have content
        # If none of them have content, this is the start of a new text output
        start = all(c.index != chunk.output_index for c in previous_chunks) or all(
            c.content == "" for c in previous_chunks if c.index == chunk.output_index
        )
        return StreamingChunk(
            content=chunk.delta,
            component_info=component_info,
            index=chunk.output_index,
            start=start,
            meta={**chunk.to_dict(), "received_at": datetime.now().isoformat()},
        )

    elif chunk.type == "response.reasoning_summary_text.delta":
        # We remove the delta from the extra because it is already in the reasoning_text
        # Remaining information needs to be saved for chat message
        extra = chunk.to_dict()
        extra.pop("delta")
        reasoning = ReasoningContent(reasoning_text=chunk.delta, extra=extra)
        return StreamingChunk(
            content="",
            component_info=component_info,
            index=chunk.output_index,
            reasoning=reasoning,
            meta={"received_at": datetime.now().isoformat()},
        )

    # the function arguments are streamed in parts
    # function name is not passed in these chunks
    elif chunk.type == "response.function_call_arguments.delta":
        arguments = chunk.delta
        extra = chunk.to_dict()
        extra.pop("delta")
        # in delta of tool calls there is no call_id so we use the item_id which is the function call id
        tool_call = ToolCallDelta(index=chunk.output_index, id=chunk.item_id, arguments=arguments, extra=extra)
        return StreamingChunk(
            content="",
            component_info=component_info,
            index=chunk.output_index,
            tool_calls=[tool_call],
            meta={"received_at": datetime.now().isoformat()},
        )

    # we return rest of the chunk as is
    chunk_message = StreamingChunk(
        content="",
        component_info=component_info,
        index=getattr(chunk, "output_index", None),
        meta={**chunk.to_dict(), "received_at": datetime.now().isoformat()},
    )
    return chunk_message


def _convert_streaming_chunks_to_chat_message(chunks: list[StreamingChunk]) -> ChatMessage:
    """
    Connects the streaming chunks into a single ChatMessage.

    :param chunks: The list of all `StreamingChunk` objects.

    :returns: The ChatMessage.
    """

    # Get the full text by concatenating all text chunks
    text = "".join([chunk.content for chunk in chunks])
    logprobs = []
    for chunk in chunks:
        if chunk.meta.get("logprobs"):
            logprobs.append(chunk.meta.get("logprobs"))

    # Gather reasoning information if present
    reasoning_id = None
    reasoning_text = ""
    for chunk in chunks:
        if chunk.reasoning:
            reasoning_text += chunk.reasoning.reasoning_text
            if chunk.reasoning.extra.get("id"):
                reasoning_id = chunk.reasoning.extra.get("id")

    # Process tool calls if present in any chunk
    tool_call_data: dict[str, dict[str, Any]] = {}  # Track tool calls by id
    for chunk in chunks:
        if chunk.tool_calls:
            for tool_call in chunk.tool_calls:
                # here the tool_call.id is fc_id not call_id
                assert tool_call.id is not None
                # We use the tool call id to track the tool call across chunks
                if tool_call.id not in tool_call_data:
                    tool_call_data[tool_call.id] = {"name": "", "arguments": ""}

                if tool_call.arguments is not None:
                    tool_call_data[tool_call.id]["arguments"] += tool_call.arguments

                # We capture the tool name from one of the chunks
                if tool_call.tool_name is not None:
                    tool_call_data[tool_call.id]["name"] = tool_call.tool_name

                # We capture the call_id from one of the chunks
                if tool_call.extra and "call_id" in tool_call.extra:
                    tool_call_data[tool_call.id]["extra"] = {"call_id": tool_call.extra["call_id"]}

    # Convert accumulated tool call data into ToolCall objects
    tool_calls = []
    sorted_keys = sorted(tool_call_data.keys())
    for key in sorted_keys:
        tool_call_dict = tool_call_data[key]
        try:
            arguments = json.loads(tool_call_dict.get("arguments", "{}")) if tool_call_dict.get("arguments") else {}
            extra: dict[str, Any] = tool_call_dict.get("extra", {})
            tool_calls.append(ToolCall(id=key, tool_name=tool_call_dict["name"], arguments=arguments, extra=extra))
        except json.JSONDecodeError:
            logger.warning(
                "The LLM provider returned a malformed JSON string for tool call arguments. This tool call "
                "will be skipped. To always generate a valid JSON, set `tools_strict` to `True`. "
                "Tool call ID: {_id}, Tool name: {_name}, Arguments: {_arguments}",
                _id=key,
                _name=tool_call_dict["name"],
                _arguments=tool_call_dict["arguments"],
            )

    # We dump the entire final response into meta to be consistent with non-streaming response
    final_response = chunks[-1].meta.get("response") or {}
    final_response.pop("output", None)
    if logprobs:
        final_response["logprobs"] = logprobs

    # Add reasoning content if both id and text are available
    reasoning = None
    if reasoning_id and reasoning_text:
        reasoning = ReasoningContent(reasoning_text=reasoning_text, extra={"id": reasoning_id, "type": "reasoning"})

    return ChatMessage.from_assistant(
        text=text or None, tool_calls=tool_calls, meta=final_response, reasoning=reasoning
    )


def _convert_chat_message_to_responses_api_format(message: ChatMessage) -> list[dict[str, Any]]:
    """
    Convert a ChatMessage to the dictionary format expected by OpenAI's Responses API.

    :param message: The ChatMessage to convert to OpenAI's Responses API format.
    :returns:
        The ChatMessage in the format expected by OpenAI's Responses API.

    :raises ValueError:
        If the message format is invalid.
    """
    text_contents = message.texts
    tool_calls = message.tool_calls
    tool_call_results = message.tool_call_results
    images = message.images
    reasonings = message.reasonings

    if not text_contents and not tool_calls and not tool_call_results and not images and not reasonings:
        raise ValueError(
            """A `ChatMessage` must contain at least one `TextContent`, `ToolCall`, `ToolCallResult`,
              `ImageContent`, or `ReasoningContent`."""
        )
    if len(tool_call_results) > 0 and len(message._content) > 1:
        raise ValueError(
            "For OpenAI compatibility, a `ChatMessage` with a `ToolCallResult` cannot contain any other content."
        )

    formatted_messages: list[dict[str, Any]] = []
    openai_msg: dict[str, Any] = {"role": message._role.value}
    if message._name is not None:
        openai_msg["name"] = message._name

    # user message
    if message._role.value == "user":
        if len(message._content) == 1 and isinstance(message._content[0], TextContent):
            openai_msg["content"] = message.text
            return [openai_msg]

        # if the user message contains a list of text and images, OpenAI expects a list of dictionaries
        content = []
        for part in message._content:
            if isinstance(part, TextContent):
                text_type = "input_text"
                content.append({"type": text_type, "text": part.text})
            elif isinstance(part, ImageContent):
                image_item: dict[str, Any]
                image_item = {
                    "type": "input_image",
                    # If no MIME type is provided, default to JPEG.
                    # OpenAI API appears to tolerate MIME type mismatches.
                    "image_url": f"data:{part.mime_type or 'image/jpeg'};base64,{part.base64_image}",
                }

                content.append(image_item)

        openai_msg["content"] = content
        return [openai_msg]

    # tool message
    if tool_call_results:
        formatted_tool_results = []
        for result in tool_call_results:
            if result.origin.id is not None:
                tool_result = {
                    "type": "function_call_output",
                    "call_id": result.origin.extra.get("call_id") if result.origin.extra else "",
                    "output": result.result,
                }
                formatted_tool_results.append(tool_result)
        formatted_messages.extend(formatted_tool_results)

    # Note: the API expects a reasoning id even if there is no reasoning text
    # function calls without reasoning ids are not supported by the API
    if reasonings:
        formatted_reasonings = []
        for reasoning in reasonings:
            reasoning_item = {
                **(reasoning.extra),
                "summary": [{"text": reasoning.reasoning_text, "type": "summary_text"}],
            }
            formatted_reasonings.append(reasoning_item)
        formatted_messages.extend(formatted_reasonings)

    if tool_calls:
        formatted_tool_calls = []
        for tc in tool_calls:
            openai_tool_call = {
                "type": "function_call",
                # We disable ensure_ascii so special chars like emojis are not converted
                "name": tc.tool_name,
                "arguments": json.dumps(tc.arguments, ensure_ascii=False),
                "id": tc.id,
                "call_id": tc.extra.get("call_id") if tc.extra else "",
            }

            formatted_tool_calls.append(openai_tool_call)
        formatted_messages.extend(formatted_tool_calls)

    # system and assistant messages

    if text_contents:
        openai_msg["content"] = " ".join(text_contents)
        formatted_messages.append(openai_msg)

    return formatted_messages
